package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.Broker;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.service.Transaction;
import org.codehaus.activemq.service.TransactionManager;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class TransactionManagerImpl implements TransactionManager {
	private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
	private Broker broker;
	private PreparedTransactionStore preparedTransactions;
	private Map activeClients = new ConcurrentHashMap();

	private Map localTxs = new ConcurrentHashMap();

	private Map xaTxs = new ConcurrentHashMap();

	private final ThreadLocal contextTx = new ThreadLocal();

	public TransactionManagerImpl(Broker broker, PreparedTransactionStore preparedTransactions) {
		this.preparedTransactions = preparedTransactions;
		this.broker = broker;
	}

	public Transaction createLocalTransaction(BrokerClient client, String txid) throws JMSException {
		AbstractTransaction t = new AbstractTransaction(this.broker) {
			private final String val$txid = txid;

			public void commit(boolean onePhase) throws XAException {
				try {
					prePrepare();
				} catch (XAException e) {
					throw e;
				} catch (Throwable e) {
					TransactionManagerImpl.log.warn("COMMIT FAILED: ", e);
					rollback();

					XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
					xae.errorCode = 104;
					xae.initCause(e);
					throw xae;
				}

				setState((byte) 3);
				TransactionManagerImpl.this.localTxs.remove(this.val$txid);
				try {
					postCommit();
				} catch (Throwable e) {
					TransactionManagerImpl.log.warn("POST COMMIT FAILED: ", e);
					XAException xae = new XAException("POST COMMIT FAILED");
					xae.errorCode = -3;
					xae.initCause(e);
					throw xae;
				}
			}

			public void rollback() throws XAException {
				setState((byte)3);
				TransactionManagerImpl.this.localTxs.remove(this.val$txid);
				try {
					postRollback();
				} catch (Throwable e) {
					TransactionManagerImpl.log.warn("POST ROLLBACK FAILED: ", e);
					XAException xae = new XAException("POST ROLLBACK FAILED");
					xae.errorCode = -3;
					xae.initCause(e);
					throw xae;
				}
			}

			public int prepare() throws XAException {
				XAException xae = new XAException("Prepare not implemented on Local Transactions.");
				xae.errorCode = -3;
				throw xae;
			}
		};
		this.localTxs.put(txid, t);
		return t;
	}

	public Transaction createXATransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
		AbstractTransaction t = new XATransactionCommand(this.broker, xid, this.xaTxs, this.preparedTransactions);
		this.xaTxs.put(xid, t);
		return t;
	}

	public Transaction getLocalTransaction(String txid) throws JMSException {
		Transaction tx = (Transaction) this.localTxs.get(txid);
		if (tx == null) {
			throw new JMSException("Transaction '" + txid + "' has not been started.");
		}

		return tx;
	}

	public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
		Transaction tx = (Transaction) this.xaTxs.get(xid);
		if (tx == null) {
			XAException e = new XAException("Transaction '" + xid + "' has not been started.");
			e.errorCode = -4;
			throw e;
		}
		return tx;
	}

	public ActiveMQXid[] getPreparedXATransactions() throws XAException {
		return this.preparedTransactions.getXids();
	}

	public void setContexTransaction(Transaction tx) {
		this.contextTx.set(tx);
	}

	public Transaction getContexTransaction() {
		return (Transaction) this.contextTx.get();
	}

	public void cleanUpClient(BrokerClient client) throws JMSException {
		List list = (List) this.activeClients.remove(client);
		if (list != null) {
			for (int i = 0; i < list.size(); i++) {
				try {
					Object o = list.get(i);
					if ((o instanceof String)) {
						Transaction t = getLocalTransaction((String) o);
						t.rollback();
					} else {
						Transaction t = getXATransaction((ActiveMQXid) o);
						t.rollback();
					}
				} catch (Exception e) {
					log.warn("ERROR Rolling back disconnected client's transactions: ", e);
				}
			}
			list.clear();
		}
	}

	public void loadTransaction(ActiveMQXid xid, Transaction transaction) throws XAException {
		if ((transaction instanceof XATransactionCommand)) {
			XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
			xaTransaction.initialise(this.xaTxs, this.preparedTransactions);
		}
		transaction.setBroker(this.broker);

		this.xaTxs.put(xid, transaction);
	}

	public void start() throws JMSException {
		this.preparedTransactions.start();
		try {
			this.preparedTransactions.loadPreparedTransactions(this);
		} catch (XAException e) {
			throw JMSExceptionHelper.newJMSException("Failed to recover: " + e, e);
		}
	}

	public void stop() throws JMSException {
		this.preparedTransactions.stop();
	}

	private void addActiveTransaction(BrokerClient client, Object transactionId) {
		List list = (List) this.activeClients.get(client);
		if (list == null) {
			list = new ArrayList();
			this.activeClients.put(client, list);
		}
		list.add(transactionId);
	}

	private void removeActiveTransaction(BrokerClient client, Object transactionId) {
		List list = (List) this.activeClients.get(client);
		if (list != null)
			list.remove(transactionId);
	}
}